// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
	"context"
	"fmt"
	"os"
	"strings"
	"testing"
	"time"

	"github.com/pingcap/failpoint"
	"github.com/pingcap/kvproto/pkg/kvrpcpb"
	"github.com/pingcap/tidb/pkg/keyspace"
	"github.com/pingcap/tidb/pkg/store/mockstore"
	"github.com/pingcap/tidb/pkg/testkit"
	"github.com/stretchr/testify/require"
	"github.com/tikv/client-go/v2/tikv"
	"github.com/tikv/client-go/v2/tikvrpc"
)

// Code below are helper utilities for the test cases.
func newGetTiFlashSystemTableRequestMocker(t *testing.T) *getTiFlashSystemTableRequestMocker {
	return &getTiFlashSystemTableRequestMocker{
		handlers: make(map[string]func(req *kvrpcpb.TiFlashSystemTableRequest) (*kvrpcpb.TiFlashSystemTableResponse, error), 0),
		t:        t,
	}
}

type getTiFlashSystemTableRequestMocker struct {
	tikv.Client
	t        *testing.T
	handlers map[string]func(req *kvrpcpb.TiFlashSystemTableRequest) (*kvrpcpb.TiFlashSystemTableResponse, error)
}

func (client *getTiFlashSystemTableRequestMocker) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
	if req.Type == tikvrpc.CmdGetTiFlashSystemTable {
		if handler, ok := client.handlers[req.Req.(*kvrpcpb.TiFlashSystemTableRequest).Sql]; ok {
			resp, err := handler(req.GetTiFlashSystemTable())
			if err != nil {
				return nil, err
			}
			return &tikvrpc.Response{Resp: resp}, nil
		}
		// If we enter here, it means no handler is matching. We should fail!
		require.Fail(client.t, fmt.Sprintf("Received request %s but no matching handler, maybe caused by unexpected query", req.Req.(*kvrpcpb.TiFlashSystemTableRequest).Sql))
	}
	return client.Client.SendRequest(ctx, addr, req, timeout)
}

func (client *getTiFlashSystemTableRequestMocker) MockQuery(query string, fn func(req *kvrpcpb.TiFlashSystemTableRequest) (*kvrpcpb.TiFlashSystemTableResponse, error)) *getTiFlashSystemTableRequestMocker {
	client.handlers[query] = fn
	return client
}

func (client *getTiFlashSystemTableRequestMocker) AsOpt() mockstore.MockTiKVStoreOption {
	return mockstore.WithClientHijacker(func(kvClient tikv.Client) tikv.Client {
		client.Client = kvClient
		return client
	})
}

var systemKeyspaceID = uint32(16777214)

func TestTiFlashSystemTableWithTiFlashV620(t *testing.T) {
	instances := []string{
		"tiflash,127.0.0.1:3933,127.0.0.1:7777,,",
		"tikv,127.0.0.1:11080,127.0.0.1:10080,,",
	}
	fpName := "github.com/pingcap/tidb/pkg/infoschema/mockStoreServerInfo"
	fpExpr := `return("` + strings.Join(instances, ";") + `")`
	require.NoError(t, failpoint.Enable(fpName, fpExpr))
	defer func() { require.NoError(t, failpoint.Disable(fpName)) }()

	// Generate the tiflash system table request mocker
	mocker := newGetTiFlashSystemTableRequestMocker(t)
	var sqlTiFlashSegment string
	if keyspace.GetKeyspaceNameBySettings() == "" {
		sqlTiFlashSegment = `SELECT * FROM system.dt_segments LIMIT 0, 1024`
	} else {
		sqlTiFlashSegment = fmt.Sprintf(`SELECT * FROM system.dt_segments WHERE keyspace_id=%d LIMIT 0, 1024`, systemKeyspaceID)
	}
	mocker.MockQuery(sqlTiFlashSegment, func(req *kvrpcpb.TiFlashSystemTableRequest) (*kvrpcpb.TiFlashSystemTableResponse, error) {
		require.EqualValues(t, req.Sql, sqlTiFlashSegment)
		data, err := os.ReadFile("testdata/tiflash_v620_dt_segments.json")
		require.NoError(t, err)
		return &kvrpcpb.TiFlashSystemTableResponse{
			Data: data,
		}, nil
	})
	var sqlTiFlashTable string
	if keyspace.GetKeyspaceNameBySettings() == "" {
		sqlTiFlashTable = `SELECT * FROM system.dt_tables LIMIT 0, 1024`
	} else {
		sqlTiFlashTable = fmt.Sprintf(`SELECT * FROM system.dt_tables WHERE keyspace_id=%d LIMIT 0, 1024`, systemKeyspaceID)
	}
	mocker.MockQuery(sqlTiFlashTable, func(req *kvrpcpb.TiFlashSystemTableRequest) (*kvrpcpb.TiFlashSystemTableResponse, error) {
		require.EqualValues(t, req.Sql, sqlTiFlashTable)
		data, err := os.ReadFile("testdata/tiflash_v620_dt_tables.json")
		require.NoError(t, err)
		return &kvrpcpb.TiFlashSystemTableResponse{
			Data: data,
		}, nil
	})

	// When try to parse the result from old version tiflash instance with the latest columns name, some columns may not exist.
	// The missing columns will be filled with <nil> value.
	store := testkit.CreateMockStore(t, withMockTiFlash(1), mocker.AsOpt())
	tk := testkit.NewTestKit(t, store)
	tk.MustQuery("select * from information_schema.TIFLASH_SEGMENTS;").Check(testkit.Rows(
		"mysql tables_priv 10 0 1 [-9223372036854775808,9223372036854775807) <nil> 0 0 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 0 <nil> 2032 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 127.0.0.1:3933",
		"mysql db 8 0 1 [-9223372036854775808,9223372036854775807) <nil> 0 0 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 0 <nil> 2032 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 127.0.0.1:3933",
		"test segment 70 0 1 [01,FA) <nil> 30511 50813627 0.6730359542460096 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 3578860 <nil> 409336 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 127.0.0.1:3933",
	))
	tk.MustQuery("show warnings").Check(testkit.Rows())

	tk.MustQuery("select * from information_schema.TIFLASH_TABLES;").Check(testkit.Rows(
		"mysql tables_priv 10 0 <nil> 1 0 0 0 <nil> 0 <nil> 0 <nil> <nil> <nil> 0 0 0 0 0 0 <nil> <nil> <nil> 0 0 0 0 <nil> <nil> 0 <nil> <nil> <nil> <nil> 0 <nil> <nil> <nil> 0 0 0  0 0 0  0 0 0  0 127.0.0.1:3933",
		"mysql db 8 0 <nil> 1 0 0 0 <nil> 0 <nil> 0 <nil> <nil> <nil> 0 0 0 0 0 0 <nil> <nil> <nil> 0 0 0 0 <nil> <nil> 0 <nil> <nil> <nil> <nil> 0 <nil> <nil> <nil> 0 0 0  0 0 0  0 0 0  0 127.0.0.1:3933",
		"test segment 70 0 <nil> 1 102000 169873868 0 0 0 <nil> 0 <nil> <nil> <nil> 0 102000 169873868 0 0 0 <nil> <nil> <nil> 1 102000 169873868 43867622 102000 169873868 0 <nil> <nil> <nil> <nil> 13 13 7846.153846153846 13067220.615384616 0 0 0  0 0 0  0 0 0  0 127.0.0.1:3933",
	))
	tk.MustQuery("show warnings").Check(testkit.Rows())
}

func TestTiFlashSystemTableWithTiFlashV630(t *testing.T) {
	instances := []string{
		"tiflash,127.0.0.1:3933,127.0.0.1:7777,,",
		"tikv,127.0.0.1:11080,127.0.0.1:10080,,",
	}
	fpName := "github.com/pingcap/tidb/pkg/infoschema/mockStoreServerInfo"
	fpExpr := `return("` + strings.Join(instances, ";") + `")`
	require.NoError(t, failpoint.Enable(fpName, fpExpr))
	defer func() { require.NoError(t, failpoint.Disable(fpName)) }()

	// Generate the tiflash system table request mocker
	mocker := newGetTiFlashSystemTableRequestMocker(t)
	var sqlTiFlashSegment string
	if keyspace.GetKeyspaceNameBySettings() == "" {
		sqlTiFlashSegment = `SELECT * FROM system.dt_segments LIMIT 0, 1024`
	} else {
		sqlTiFlashSegment = fmt.Sprintf(`SELECT * FROM system.dt_segments WHERE keyspace_id=%d LIMIT 0, 1024`, systemKeyspaceID)
	}
	mocker.MockQuery(sqlTiFlashSegment, func(req *kvrpcpb.TiFlashSystemTableRequest) (*kvrpcpb.TiFlashSystemTableResponse, error) {
		require.EqualValues(t, req.Sql, sqlTiFlashSegment)
		data, err := os.ReadFile("testdata/tiflash_v630_dt_segments.json")
		require.NoError(t, err)
		return &kvrpcpb.TiFlashSystemTableResponse{
			Data: data,
		}, nil
	})

	// When try to parse the result from old version tiflash instance with the latest columns name, some columns may not exist.
	// The missing columns will be filled with <nil> value.
	store := testkit.CreateMockStore(t, withMockTiFlash(1), mocker.AsOpt())
	tk := testkit.NewTestKit(t, store)
	tk.MustQuery("select * from information_schema.TIFLASH_SEGMENTS;").Check(testkit.Rows(
		"mysql tables_priv 10 0 1 [-9223372036854775808,9223372036854775807) 0 0 0 <nil> 0 0 0 0 2 0 0 0 0 0 <nil> 2032 3 0 0 1 1 0 0 0 0 127.0.0.1:3933",
		"test segment 70 436272981189328904 1 [01,FA) 5 102000 169874232 0 0 0 0 0 2 0 0 0 0 0 <nil> 2032 3 102000 169874232 1 68 102000 169874232 43951837 20 127.0.0.1:3933",
		"test segment 75 0 1 [01,013130303030393535FF61653666642D6136FF61382D343032382DFF616436312D663736FF3062323736643461FF3600000000000000F8) 2 0 0 <nil> 0 0 1 1 110 0 0 4 4 0 <nil> 2032 111 0 0 1 70 0 0 0 0 127.0.0.1:3933",
		"test segment 75 0 113 [013130303030393535FF61653666642D6136FF61382D343032382DFF616436312D663736FF3062323736643461FF3600000000000000F8,013139393938363264FF33346535382D3735FF31382D343661612DFF626235392D636264FF3139333434623736FF3100000000000000F9) 2 10167 16932617 0.4887380741615029 0 0 0 0 114 4969 8275782 2 0 0 <nil> 63992 112 5198 8656835 1 71 5198 8656835 2254100 1 127.0.0.1:3933",
		"test segment 75 0 116 [013139393938363264FF33346535382D3735FF31382D343661612DFF626235392D636264FF3139333434623736FF3100000000000000F9,013330303131383034FF61323537662D6638FF63302D346466622DFF383235632D353361FF3236306338616662FF3400000000000000F8) 3 8 13322 0.5 3 4986 1 0 117 1 1668 4 3 4986 <nil> 2032 115 4 6668 1 78 4 6668 6799 1 127.0.0.1:3933",
		"test segment 75 0 125 [013330303131383034FF61323537662D6638FF63302D346466622DFF383235632D353361FF3236306338616662FF3400000000000000F8,013339393939613861FF30663062332D6537FF32372D346234642DFF396535632D363865FF3336323066383431FF6300000000000000F9) 2 8677 14451079 0.4024432407514118 3492 5816059 3 0 126 0 0 0 0 5816059 <nil> 2032 124 5185 8635020 1 79 5185 8635020 2247938 1 127.0.0.1:3933",
		"test segment 75 0 128 [013339393939613861FF30663062332D6537FF32372D346234642DFF396535632D363865FF3336323066383431FF6300000000000000F9,013730303031636230FF32663330652D3539FF62352D346134302DFF613539312D383930FF6132316364633466FF3200000000000000F8) 0 1 1668 1 0 0 0 0 129 1 1668 5 4 0 <nil> 2032 127 0 0 1 78 4 6668 6799 1 127.0.0.1:3933",
		"test segment 75 0 119 [013730303031636230FF32663330652D3539FF62352D346134302DFF613539312D383930FF6132316364633466FF3200000000000000F8,013739393939386561FF36393566612D3534FF64302D346437642DFF383136612D646335FF6432613130353533FF3200000000000000F9) 2 10303 17158730 0.489372027564787 0 0 0 0 120 5042 8397126 2 0 0 <nil> 63992 118 5261 8761604 1 77 5261 8761604 2280506 1 127.0.0.1:3933",
		"test segment 75 0 122 [013739393939386561FF36393566612D3534FF64302D346437642DFF383136612D646335FF6432613130353533FF3200000000000000F9,FA) 0 1 1663 1 0 0 0 0 123 1 1663 4 3 0 <nil> 2032 121 0 0 1 78 4 6668 6799 1 127.0.0.1:3933",
	))
	tk.MustQuery("show warnings").Check(testkit.Rows())
}

func TestTiFlashSystemTableWithTiFlashV640(t *testing.T) {
	instances := []string{
		"tiflash,127.0.0.1:3933,127.0.0.1:7777,,",
		"tikv,127.0.0.1:11080,127.0.0.1:10080,,",
	}
	fpName := "github.com/pingcap/tidb/pkg/infoschema/mockStoreServerInfo"
	fpExpr := `return("` + strings.Join(instances, ";") + `")`
	require.NoError(t, failpoint.Enable(fpName, fpExpr))
	defer func() { require.NoError(t, failpoint.Disable(fpName)) }()

	// Generate the tiflash system table request mocker
	mocker := newGetTiFlashSystemTableRequestMocker(t)
	var sqlTiFlashTable string
	if keyspace.GetKeyspaceNameBySettings() == "" {
		sqlTiFlashTable = `SELECT * FROM system.dt_tables LIMIT 0, 1024`
	} else {
		sqlTiFlashTable = fmt.Sprintf(`SELECT * FROM system.dt_tables WHERE keyspace_id=%d LIMIT 0, 1024`, systemKeyspaceID)
	}
	mocker.MockQuery(sqlTiFlashTable, func(req *kvrpcpb.TiFlashSystemTableRequest) (*kvrpcpb.TiFlashSystemTableResponse, error) {
		require.EqualValues(t, req.Sql, sqlTiFlashTable)
		data, err := os.ReadFile("testdata/tiflash_v640_dt_tables.json")
		require.NoError(t, err)
		return &kvrpcpb.TiFlashSystemTableResponse{
			Data: data,
		}, nil
	})

	// When try to parse the result from old version tiflash instance with the latest columns name, some columns may not exist.
	// The missing columns will be filled with <nil> value.
	store := testkit.CreateMockStore(t, withMockTiFlash(1), mocker.AsOpt())
	tk := testkit.NewTestKit(t, store)
	tk.MustQuery("select * from information_schema.TIFLASH_TABLES;").Check(testkit.Rows(
		"tpcc customer 135 0 <nil> 4 3528714 2464079200 0 0.002329177144988231 1 0 929227 <nil> 0.16169850346757514 0 8128 882178.5 616019800 4 8219 5747810 2054.75 1436952.5 0 4 3520495 2458331390 1601563417 880123.75 614582847.5 24 8 6 342.4583333333333 239492.08333333334 482 120.5 7303.9315352697095 5100272.593360996 0 0 0  0 0 0  0 0 0  0 127.0.0.1:3933",
		"tpcc district 137 0 <nil> 1 7993 1346259 0 0.8748905292130614 1 0.8055198055198055 252168 <nil> 0.21407121407121407 0 147272 7993 1346259 1 6993 1178050 6993 1178050 0 1 1000 168209 91344 1000 168209 6 6 6 1165.5 196341.66666666666 10 10 100 16820.9 0 0 0  0 0 0  0 0 0  0 127.0.0.1:3933",
		"tpcc history 139 0 <nil> 19 19379697 1629276978 0 0.0006053758219233252 0.5789473684210527 0.4626662120695534 253640 <nil> 0.25434708489601093 0 293544 1019984.052631579 85751419.89473684 11 11732 997220 1066.5454545454545 90656.36363636363 0 19 19367965 1628279758 625147717 1019366.5789473684 85698934.63157895 15 4 1.3636363636363635 782.1333333333333 66481.33333333333 2378 125.15789473684211 8144.644659377628 684726.559293524 0 0 0  0 0 0  0 0 0  0 127.0.0.1:3933",
		"tpcc item 141 0 <nil> 1 100000 10799081 0 0 0 <nil> 0 <nil> <nil> <nil> 0 100000 10799081 0 0 0 <nil> <nil> <nil> 1 100000 10799081 7357726 100000 10799081 0 0 <nil> <nil> <nil> 13 13 7692.307692307692 830698.5384615385 0 0 0  0 0 0  0 0 0  0 127.0.0.1:3933",
		"tpcc new_order 143 0 <nil> 4 2717707 78813503 0 0.02266763856442214 1 0.9678592299201351 52809 <nil> 0.029559768846178818 0 1434208 679426.75 19703375.75 4 61604 1786516 15401 446629 0 3 2656103 77026987 40906492 885367.6666666666 25675662.333333332 37 24 9.25 1664.972972972973 48284.21621621621 380 126.66666666666667 6989.744736842105 202702.59736842106 0 0 0  0 0 0  0 0 0  0 127.0.0.1:3933",
		"tpcc order_line 145 0 <nil> 203 210607202 20007684190 0 0.0054566462546708164 0.5862068965517241 0.7810067620424135 620065 <nil> 0.005679558722564825 0 22607144 1037473.9014778325 98560020.64039409 119 1149209 109174855 9657.218487394957 917435.756302521 0 203 209457993 19898509335 8724002804 1031812.7733990147 98022213.47290641 893 39 7.504201680672269 1286.9081746920492 122256.27659574468 31507 155.20689655172413 6647.982765734599 631558.3627447869 0 0 0  0 0 0  0 0 0  0 127.0.0.1:3933",
		"tpcc orders 147 0 <nil> 22 21903301 1270391458 0 0.02021357420052804 0.7272727272727273 0.9239944527763222 260536 <nil> 0.010145817899282655 0 10025264 995604.5909090909 57745066.27272727 16 442744 25679152 27671.5 1604947 0 22 21460557 1244712306 452173775 975479.8636363636 56577832.09090909 242 34 15.125 1829.5206611570247 106112.19834710743 2973 135.13636363636363 7218.485368314833 418672.15136226034 0 0 0  0 0 0  0 0 0  0 127.0.0.1:3933",
		"tpcc stock 149 0 <nil> 42 11112720 4811805131 0 0.028085203262567582 0.9761904761904762 0.8463391893060944 10227093 <nil> 0.07567373591410528 0 6719064 264588.5714285714 114566788.83333333 41 312103 135131097 7612.268292682927 3295880.4146341463 0 42 10800617 4676674034 3231872509 257157.54761904763 111349381.76190476 238 26 5.804878048780488 1311.357142857143 567777.718487395 1644 39.142857142857146 6569.718369829684 2844692.234793187 0 0 0  0 0 0  0 0 0  0 127.0.0.1:3933",
		"tpcc warehouse 151 0 <nil> 1 5842 923615 0 0.9828825744608011 1 0.9669104841518634 70220 <nil> 0.07732497387669801 0 133048 5842 923615 1 5742 907807 5742 907807 0 1 100 15808 11642 100 15808 5 5 5 1148.4 181561.4 5 5 20 3161.6 0 0 0  0 0 0  0 0 0  0 127.0.0.1:3933",
	))
	tk.MustQuery("show warnings").Check(testkit.Rows())
}
